Amazon MSK(フルマネージドのKafkaクラスタ)のパブリックプレビューが東京リージョンに来たので試してみました
はじめに
昨年のre:Invent2018で発表されたフルマネージドのKafkaクラスタのサービス、Amazon MSKのパブリックプレビューが東京リージョンに来ました。まだ、パブリックプレビューですが記念に試してみました。
Kafka用クライアントの準備
MSK/kafkaクラスタは、VPC内に構築するためパブリックアクセスできません。そのため、Kafka用クライアントはVPC内にEC2インスタンスを作成する必要があります。Kafka用クライアントには、java-1.8.0とkafka_2.12-2.1.0のインストールは必要です。
Javaのインストール
[ec2-user@ip-10-0-0-168 ~]$sudo yum -y install java-1.8.0 読み込んだプラグイン:priorities, update-motd, upgrade-helper 依存性の解決をしています --> トランザクションの確認を実行しています。 ---> パッケージ java-1.8.0-openjdk.x86_64 1:1.8.0.201.b09-0.43.amzn1 を インストール --> 依存性の処理をしています: java-1.8.0-openjdk-headless(x86-64) = 1:1.8.0.201.b09-0.43.amzn1 のパッケージ: 1:java-1.8.0-openjdk-1.8.0.201.b09-0.43.amzn1.x86_64 --> トランザクションの確認を実行しています。 ---> パッケージ java-1.8.0-openjdk-headless.x86_64 1:1.8.0.201.b09-0.43.amzn1 を インストール : : インストール: java-1.8.0-openjdk.x86_64 1:1.8.0.201.b09-0.43.amzn1 依存性関連をインストールしました: avahi-libs.x86_64 0:0.6.25-12.17.amzn1 cups-libs.x86_64 1:1.4.2-67.21.amzn1 gnutls.x86_64 0:2.12.23-21.18.amzn1 java-1.8.0-openjdk-headless.x86_64 1:1.8.0.201.b09-0.43.amzn1 jbigkit-libs.x86_64 0:2.0-11.4.amzn1 libtiff.x86_64 0:4.0.3-27.29.amzn1 lksctp-tools.x86_64 0:1.0.10-7.7.amzn1 完了しました! [ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ sudo alternatives --config java 2 プログラムがあり 'java' を提供します。 選択 コマンド ----------------------------------------------- *+ 1 /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java 2 /usr/lib/jvm/jre-1.8.0-openjdk.x86_64/bin/java Enter を押して現在の選択 [+] を保持するか、選択番号を入力します:2 [ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ java -version openjdk version "1.8.0_201" OpenJDK Runtime Environment (build 1.8.0_201-b09) OpenJDK 64-Bit Server VM (build 25.201-b09, mixed mode) [ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$
kafka_2.12-2.1.0インストール
このインストールは、Topicの作成やメッセージの送受信用プログラムを利用するためにインストールしました。プロダクションでは必ずしも必要ではありません。
[ec2-user@ip-10-0-0-168 ~]$ wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz --2019-04-30 09:58:10-- https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz archive.apache.org (archive.apache.org) をDNSに問いあわせています... 163.172.17.199 archive.apache.org (archive.apache.org)|163.172.17.199|:443 に接続しています... 接続しました。 HTTP による接続要求を送信しました、応答を待っています... 200 OK 長さ: 55201623 (53M) [application/x-gzip] `kafka_2.12-2.1.0.tgz' に保存中 kafka_2.12-2.1.0.tgz 100%[=======================================================>] 52.64M 6.52MB/s in 9.5s 2019-04-30 09:58:21 (5.56 MB/s) - `kafka_2.12-2.1.0.tgz' へ保存完了 [55201623/55201623] [ec2-user@ip-10-0-0-168 ~]$ ll 合計 53908 -rw-rw-r-- 1 ec2-user ec2-user 55201623 11月 20 19:16 kafka_2.12-2.1.0.tgz [ec2-user@ip-10-0-0-168 ~]$ tar -xzf kafka_2.12-2.1.0.tgz [ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ ll 合計 52 -rw-r--r-- 1 ec2-user ec2-user 32216 11月 9 19:50 LICENSE -rw-r--r-- 1 ec2-user ec2-user 336 11月 9 19:50 NOTICE drwxr-xr-x 3 ec2-user ec2-user 4096 11月 9 19:54 bin drwxr-xr-x 2 ec2-user ec2-user 4096 11月 9 19:54 config drwxr-xr-x 2 ec2-user ec2-user 4096 4月 30 13:23 libs drwxr-xr-x 2 ec2-user ec2-user 4096 11月 9 19:54 site-docs
AWSCLIのアップデート
執筆時点のAMI(ami-00a5245b4816c38e6)にインストールされているAWSCLIは古いため、MSKのサブコマンドがインストールされていませんでしたので、別途AWSCLIをアップデートします。
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ sudo pip install -U awscli Collecting awscli Downloading https://files.pythonhosted.org/packages/ec/78/01f03354bf02d4c3d1a0de01cbe77529b79df4ea078284f1172690834586/awscli-1.16.148-py2.py3-none-any.whl (1.5MB) 100% |████████████████████████████████| 1.5MB 802kB/s Collecting colorama<=0.3.9,>=0.2.5 (from awscli) Downloading https://files.pythonhosted.org/packages/db/c8/7dcf9dbcb22429512708fe3a547f8b6101c0d02137acbd892505aee57adf/colorama-0.3.9-py2.py3-none-any.whl Collecting rsa<=3.5.0,>=3.1.2 (from awscli) Downloading : : Found existing installation: botocore 1.10.82 Uninstalling botocore-1.10.82: Successfully uninstalled botocore-1.10.82 Found existing installation: awscli 1.15.83 Uninstalling awscli-1.15.83: Successfully uninstalled awscli-1.15.83 Successfully installed PyYAML-3.13 awscli-1.16.148 botocore-1.12.138 colorama-0.3.9 docutils-0.14 futures-3.2.0 jmespath-0.9.4 pyasn1-0.4.5 python-dateutil-2.8.0 rsa-3.4.2 s3transfer-0.2.0 six-1.12.0 urllib3-1.24.2 You are using pip version 9.0.3, however version 19.1 is available. You should consider upgrading via the 'pip install --upgrade pip' command.
Amazon MSK/kafkaクラスタの構築
構築
今回はマネジメントコンソールからAmazon MSK/kafkaクラスタの構築します。re:Invent2018発表の際にはバージョンは1.1.0のみでしたが、バージョンは2.1.0を指定しました。
3つのアベイラビリティゾーンに作成しなければなりませんので、Kafkaブローカは3つ以上になります。
1つのアベイラビリティゾーンに作成するKafkaブローカを指定できます。
「高度な設定」で、設定をカスタマイズできます。今回は「デフォルト設定の使用」のまま作成しました。
Topicの作成
ZookeeperConnectString の取得
Topic作成時に必要となるZookeeperConnectString
を取得します。この値はAPI経由で取得しますが、今回はAWSCLIを利用します。引数にcluster-arnの引数にarn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3
を指定しました。
aws kafka describe-cluster --region ap-northeast-1 --cluster-arn "arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3"
なお、コマンドの出力のState
がCREATING
の場合は、ACTIVE
になってからコマンドを再実行してください。結果は、以下のとおりです。
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ aws kafka describe-cluster --region ap-northeast-1 --cluster-arn "arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3" { "ClusterInfo": { "EncryptionInfo": { "EncryptionAtRest": { "DataVolumeKMSKeyId": "arn:aws:kms:ap-northeast-1:1234567890123:key/928e86a0-8289-4eb6-8a2d-aff7afdd96b0" } }, "BrokerNodeGroupInfo": { "BrokerAZDistribution": "DEFAULT", "ClientSubnets": [ "subnet-ffbxxxxx", "subnet-8f2xxxxx", "subnet-0c7xxxxxxxxxxxxx" ], "StorageInfo": { "EbsStorageInfo": { "VolumeSize": 1000 } }, "SecurityGroups": [ "sg-5b58b23e" ], "InstanceType": "kafka.m5.large" }, "ClusterName": "classmethod-cluster", "CurrentBrokerSoftwareInfo": { "KafkaVersion": "2.1.0" }, "CreationTime": "2019-04-30T13:21:52.115Z", "NumberOfBrokerNodes": 3, "ZookeeperConnectString": "10.0.5.48:2181,10.0.3.146:2181,10.0.1.135:2181", "State": "ACTIVE", "CurrentVersion": "K1VC38T7YXB528", "ClusterArn": "arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3", "EnhancedMonitoring": "DEFAULT" } }
Topicの作成
kafka-topics.shのzookeeper
引数にZookeeperConnectString
の値10.0.5.48:2181,10.0.3.146:2181,10.0.1.135:2181
を設定してコマンドを実行します。トピック名はClassmethodTopic
を指定しました。
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ bin/kafka-topics.sh --create --zookeeper "10.0.5.48:2181,10.0.3.146:2181,10.0.1.135:2181" --replication-factor 3 --partitions 1 --topic ClassmethodTopic OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N Created topic "ClassmethodTopic". [ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ bin/kafka-topics.sh --list --zookeeper "10.0.5.48:2181,10.0.3.146:2181,10.0.1.135:2181" OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N ClassmethodTopic __consumer_offsets
メッセージの送受信
メッセージの送受信には、先ほどダウンロードしたkafka_2.12-2.1.0のコマンドを用いて動作確認します。ここではkafkaに対してメッセージを送信する役割を「Producer」、kafkaからメッセージを受信する役割を「Consumer」と呼びます。
※ Kafkaは、GoFのデザインパターンProducer-Consumerパターンの用語を用いられることが多いです。
BootstrapBrokerStringの取得
メッセージの送受信に必要となるBootstrapBrokerStringの取得します。この値はAPI経由で取得しますが、今回はAWSCLIを利用します。引数にcluster-arnの引数に"arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3"
を指定しました。
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ aws kafka get-bootstrap-brokers --region ap-northeast-1 --cluster-arn "arn:aws:kafka:ap-northeast-1:1234567890123:cluster/classmethod-cluster/11c734ae-27a6-47bb-9e83-07864d4d77e8-3" { "BootstrapBrokerString": "b-1.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-3.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092" }
MSK(Kafka)にメッセージを送信 - Producer
bin/kafka-console-producer.sh --broker-list "b-1.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-3.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092" --topic ClassmethodTopic
上記コマンド実行後、以下のメッセージをコンソールに続けて入力します。
一つ、人の世の生血をすすり 二つ、ふらちな悪行残名 三つ、醜い浮き世の鬼を退治てくれよう、桃太郎
MSK(Kafka)からメッセージを受信 - Consumer
別のターミナルを立ち上げ、以下のコマンドを実行します。すると、メッセージ送信で入力したメッセージが受信されました。マルチバイト(日本語)も問題ありません。
[ec2-user@ip-10-0-0-168 kafka_2.12-2.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server "b-1.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-3.1clielq3iu6uvttpy40k5tna.c3.kafka.ap-northeast-1.amazonaws.com:9092" --topic ClassmethodTopic --from-beginning OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N 一つ、人の世の生血をすすり 二つ、ふらちな悪行残名 三つ、醜い浮き世の鬼を退治てくれよう、桃太郎
最後に
HadoopやSparkを用いたストリームサービスをクラウドにマイグレーションする際に、EMR+セルフマネージドなKafkaをEC2上に構築するではなく、EMR+MSKにできる日が近くなりそうです。プレビューなのでGAが最終的に同じ仕様とは限りませんが、使い勝手の良いサービスになりそうで今から楽しみです。
合わせて読みたい
[新サービス]フルマネージドなApache Kafka、Amazon Managed Streaming for Kafka (MSK)が発表されました #reinvent
[レポート]ANT398 – Amazon Managed Streaming for Kafka (Amazon MSK)入門 #reinvent